package daemon

import (
	"bufio"
	"context"
	"encoding/json"
	"fmt"
	"io"
	"io/ioutil"
	"net/http"
	"os"
	"os/exec"
	"os/signal"
	"path/filepath"
	"strings"
	"sync"
	"syscall"
	"time"

	ign3types "github.com/coreos/ignition/v2/config/v3_2/types"
	"github.com/golang/glog"
	"github.com/pkg/errors"
	"golang.org/x/time/rate"
	corev1 "k8s.io/api/core/v1"
	apierrors "k8s.io/apimachinery/pkg/api/errors"
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
	"k8s.io/apimachinery/pkg/types"
	utilruntime "k8s.io/apimachinery/pkg/util/runtime"
	"k8s.io/apimachinery/pkg/util/strategicpatch"
	"k8s.io/apimachinery/pkg/util/wait"
	coreinformersv1 "k8s.io/client-go/informers/core/v1"
	"k8s.io/client-go/kubernetes"
	"k8s.io/client-go/kubernetes/scheme"
	corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
	corev1lister "k8s.io/client-go/listers/core/v1"
	"k8s.io/client-go/tools/cache"
	"k8s.io/client-go/tools/record"
	clientretry "k8s.io/client-go/util/retry"
	"k8s.io/client-go/util/workqueue"
	"k8s.io/kubectl/pkg/drain"

	configv1 "github.com/openshift/api/config/v1"
	mcoResourceRead "github.com/openshift/machine-config-operator/lib/resourceread"
	mcfgv1 "github.com/openshift/machine-config-operator/pkg/apis/machineconfiguration.openshift.io/v1"
	commonconstants "github.com/openshift/machine-config-operator/pkg/constants"
	ctrlcommon "github.com/openshift/machine-config-operator/pkg/controller/common"
	"github.com/openshift/machine-config-operator/pkg/daemon/constants"
	mcfginformersv1 "github.com/openshift/machine-config-operator/pkg/generated/informers/externalversions/machineconfiguration.openshift.io/v1"
	mcfglistersv1 "github.com/openshift/machine-config-operator/pkg/generated/listers/machineconfiguration.openshift.io/v1"
)

// Daemon is the dispatch point for the functions of the agent on the
// machine. it keeps track of connections and the current state of the update
// process.
type Daemon struct {
	// name is the node name.
	name string

	// os the operating system the MCD is running on
	os OperatingSystem

	// mock is set if we're running as non-root, probably under unit tests
	mock bool

	// NodeUpdaterClient an instance of the client which interfaces with host content deployments
	NodeUpdaterClient NodeUpdaterClient

	// bootID is a unique value per boot (generated by the kernel)
	bootID string

	// bootedOSImageURL is the currently booted URL of the operating system
	bootedOSImageURL string

	// kubeClient allows interaction with Kubernetes, including the node we are running on.
	kubeClient kubernetes.Interface

	// recorder sends events to the apiserver
	recorder record.EventRecorder

	// nodeLister is used to watch for updates via the informer
	nodeLister       corev1lister.NodeLister
	nodeListerSynced cache.InformerSynced

	mcLister       mcfglistersv1.MachineConfigLister
	mcListerSynced cache.InformerSynced

	// skipReboot skips the reboot after a sync, only valid with onceFrom != ""
	skipReboot bool

	kubeletHealthzEnabled  bool
	kubeletHealthzEndpoint string

	updateActive     bool
	updateActiveLock sync.Mutex

	nodeWriter NodeWriter

	// channel used by callbacks to signal Run() of an error
	exitCh chan<- error

	// channel used to ensure all spawned goroutines exit when we exit.
	stopCh <-chan struct{}

	// node is the current instance of the node being processed through handleNodeEvent
	// or the very first instance grabbed when the daemon starts
	node *corev1.Node

	queue       workqueue.RateLimitingInterface
	enqueueNode func(*corev1.Node)
	syncHandler func(node string) error

	// isControlPlane is true if this node is a control plane (master).
	// The machine may also be a worker (with schedulable masters).
	isControlPlane bool
	// nodeInitialized is true when we've performed one-time initialization
	// after having updated the node object
	nodeInitialized bool
	// booting is true when all initial synchronization to the target
	// machineconfig is done
	booting bool

	currentConfigPath string

	loggerSupportsJournal bool

	drainer *drain.Helper

	// Config Drift Monitor
	configDriftMonitor ConfigDriftMonitor
}

const (
	// pathSystemd is the path systemd modifiable units, services, etc.. reside
	pathSystemd = "/etc/systemd/system"
	// pathDevNull is the systems path to and endless blackhole
	pathDevNull = "/dev/null"
	// currentConfigPath is where we store the current config on disk to validate
	// against annotations changes
	currentConfigPath = "/etc/machine-config-daemon/currentconfig"
	// pendingStateMessageID is the id we store the pending state in journal. We use it to
	// also retrieve the pending config after a reboot
	pendingStateMessageID = "machine-config-daemon-pending-state"

	kubeletHealthzPollingInterval = 30 * time.Second
	kubeletHealthzTimeout         = 30 * time.Second

	// updateDelay is the baseline speed at which we react to changes.  We don't
	// need to react in milliseconds as any change would involve rebooting the node.
	// Having this be relatively high limits the number of times we retry before
	// the MCC/MCO will time out.  We don't want to spam our logs with the same
	// error.
	updateDelay = 5 * time.Second

	// maxUpdateBackoff is the maximum time to react to a change as we back off
	// in the face of errors.
	maxUpdateBackoff = 60 * time.Second
)

type onceFromOrigin int

const (
	onceFromUnknownConfig onceFromOrigin = iota
	onceFromLocalConfig
	onceFromRemoteConfig
)

var (
	defaultRebootTimeout = 24 * time.Hour
)

// rebootCommand creates a new transient systemd unit to reboot the system.
// We explicitly try to stop kubelet.service first, before anything else; this
// way we ensure the rest of system stays running, because kubelet may need
// to do "graceful" shutdown by e.g. de-registering with a load balancer.
// However note we use `;` instead of `&&` so we keep rebooting even
// if kubelet failed to shutdown - that way the machine will still eventually reboot
// as systemd will time out the stop invocation.
func rebootCommand(rationale string) *exec.Cmd {
	return exec.Command("systemd-run", "--unit", "machine-config-daemon-reboot",
		"--description", fmt.Sprintf("machine-config-daemon: %s", rationale), "/bin/sh", "-c", "systemctl stop kubelet.service; systemctl reboot")
}

// getBootID loads the unique "boot id" which is generated by the Linux kernel.
func getBootID() (string, error) {
	currentBootIDBytes, err := ioutil.ReadFile("/proc/sys/kernel/random/boot_id")
	if err != nil {
		return "", err
	}
	return strings.TrimSpace(string(currentBootIDBytes)), nil
}

// New sets up the systemd and kubernetes connections needed to update the
// machine.
func New(
	nodeUpdaterClient NodeUpdaterClient,
	exitCh chan<- error,
) (*Daemon, error) {
	mock := false
	if os.Getuid() != 0 {
		mock = true
	}

	var (
		osImageURL string
		osVersion  string
		err        error
	)

	hostos := OperatingSystem{}
	if !mock {
		hostos, err = GetHostRunningOS()
		if err != nil {
			HostOS.WithLabelValues("unsupported", "").Set(1)
			return nil, errors.Wrapf(err, "checking operating system")
		}
	}

	// Only pull the osImageURL from OSTree when we are on RHCOS or FCOS
	if hostos.IsCoreOSVariant() {
		err := nodeUpdaterClient.Initialize()
		if err != nil {
			return nil, fmt.Errorf("error initializing rpm-ostree: %v", err)
		}
		osImageURL, osVersion, err = nodeUpdaterClient.GetBootedOSImageURL()
		if err != nil {
			return nil, fmt.Errorf("error reading osImageURL from rpm-ostree: %v", err)
		}
		glog.Infof("Booted osImageURL: %s (%s)", osImageURL, osVersion)
	}

	bootID := ""
	if !mock {
		bootID, err = getBootID()
		if err != nil {
			return nil, errors.Wrapf(err, "failed to read boot ID")
		}
	}

	// RHEL 7.6/Centos 7 logger (util-linux) doesn't have the --journald flag
	loggerSupportsJournal := true
	if !mock {
		if hostos.IsLikeTraditionalRHEL7() {
			loggerOutput, err := exec.Command("logger", "--help").CombinedOutput()
			if err != nil {
				return nil, errors.Wrapf(err, "running logger --help")
			}
			loggerSupportsJournal = strings.Contains(string(loggerOutput), "--journald")
		}
	}

	// report OS & version (if RHCOS or FCOS) to prometheus
	HostOS.WithLabelValues(hostos.ToPrometheusLabel(), osVersion).Set(1)

	return &Daemon{
		mock:                  mock,
		booting:               true,
		os:                    hostos,
		NodeUpdaterClient:     nodeUpdaterClient,
		bootedOSImageURL:      osImageURL,
		bootID:                bootID,
		exitCh:                exitCh,
		currentConfigPath:     currentConfigPath,
		loggerSupportsJournal: loggerSupportsJournal,
		configDriftMonitor:    NewConfigDriftMonitor(),
	}, nil
}

// ClusterConnect sets up the systemd and kubernetes connections needed to update the
// machine.
func (dn *Daemon) ClusterConnect(
	name string,
	kubeClient kubernetes.Interface,
	mcInformer mcfginformersv1.MachineConfigInformer,
	nodeInformer coreinformersv1.NodeInformer,
	kubeletHealthzEnabled bool,
	kubeletHealthzEndpoint string,
) {
	dn.name = name
	dn.kubeClient = kubeClient

	dn.nodeWriter = newNodeWriter()
	go dn.nodeWriter.Run(dn.stopCh)

	// Other controllers start out with the default controller limiter which retries
	// in milliseconds; since any change here will involve rebooting the node
	// we don't need to react in milliseconds.  See also updateDelay above.
	dn.queue = workqueue.NewNamedRateLimitingQueue(workqueue.NewMaxOfRateLimiter(
		&workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(updateDelay), 1)},
		workqueue.NewItemExponentialFailureRateLimiter(1*time.Second, maxUpdateBackoff)), "machineconfigdaemon")

	eventBroadcaster := record.NewBroadcaster()
	eventBroadcaster.StartLogging(glog.V(2).Infof)
	eventBroadcaster.StartRecordingToSink(&corev1client.EventSinkImpl{Interface: dn.kubeClient.CoreV1().Events("")})
	dn.recorder = eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: "machineconfigdaemon", Host: dn.name})

	go dn.runLoginMonitor(dn.stopCh, dn.exitCh)

	nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
		AddFunc:    dn.handleNodeEvent,
		UpdateFunc: func(oldObj, newObj interface{}) { dn.handleNodeEvent(newObj) },
	})
	dn.nodeLister = nodeInformer.Lister()
	dn.nodeListerSynced = nodeInformer.Informer().HasSynced
	dn.mcLister = mcInformer.Lister()
	dn.mcListerSynced = mcInformer.Informer().HasSynced

	dn.enqueueNode = dn.enqueueDefault
	dn.syncHandler = dn.syncNode

	dn.kubeletHealthzEnabled = kubeletHealthzEnabled
	dn.kubeletHealthzEndpoint = kubeletHealthzEndpoint

	dn.drainer = &drain.Helper{
		Client:              dn.kubeClient,
		Force:               true,
		IgnoreAllDaemonSets: true,
		DeleteEmptyDirData:  true,
		GracePeriodSeconds:  -1,
		Timeout:             90 * time.Second,
		OnPodDeletedOrEvicted: func(pod *corev1.Pod, usingEviction bool) {
			verbStr := "Deleted"
			if usingEviction {
				verbStr = "Evicted"
			}
			glog.Infof("%s pod %s/%s", verbStr, pod.Namespace, pod.Name)
		},
		Out:    writer{glog.Info},
		ErrOut: writer{glog.Error},
		Ctx:    context.TODO(),
	}
}

// writer implements io.Writer interface as a pass-through for klog.
type writer struct {
	logFunc func(args ...interface{})
}

// Write passes string(p) into writer's logFunc and always returns len(p)
func (w writer) Write(p []byte) (n int, err error) {
	w.logFunc(string(p))
	return len(p), nil
}

// worker runs a worker thread that just dequeues items, processes them, and marks them done.
// It enforces that the syncHandler is never invoked concurrently with the same key.
func (dn *Daemon) worker() {
	for dn.processNextWorkItem() {
	}
}

func (dn *Daemon) processNextWorkItem() bool {
	key, quit := dn.queue.Get()
	if quit {
		return false
	}
	defer dn.queue.Done(key)

	err := dn.syncHandler(key.(string))
	dn.handleErr(err, key)

	return true
}

func (dn *Daemon) handleErr(err error, key interface{}) {
	if err == nil {
		dn.queue.Forget(key)
		return
	}

	dn.updateErrorState(err)
	// This is at V(2) since the updateErrorState() call above ends up logging too
	glog.V(2).Infof("Error syncing node %v (retries %d): %v", key, dn.queue.NumRequeues(key), err)
	dn.queue.AddRateLimited(key)
}

func (dn *Daemon) updateErrorState(err error) {
	switch errors.Cause(err) {
	case errUnreconcilable:
		dn.nodeWriter.SetUnreconcilable(err, dn.kubeClient.CoreV1().Nodes(), dn.nodeLister, dn.name)
	default:
		dn.nodeWriter.SetDegraded(err, dn.kubeClient.CoreV1().Nodes(), dn.nodeLister, dn.name)
	}
}

// initializeNode is called the first time we get our node object; however to
// ensure we handle failures: everything called from here should be idempotent.
func (dn *Daemon) initializeNode() error {
	if dn.nodeInitialized {
		return nil
	}
	// Some parts of the MCO dispatch on whether or not we're managing a control plane node
	if _, isControlPlane := dn.node.Labels[ctrlcommon.MasterLabel]; isControlPlane {
		glog.Infof("Node %s is part of the control plane", dn.node.Name)
		if err := dn.initializeControlPlane(); err != nil {
			return err
		}
		dn.isControlPlane = true
	} else {
		glog.Infof("Node %s is not labeled %s", dn.node.Name, ctrlcommon.MasterLabel)
	}
	dn.nodeInitialized = true
	return nil
}

func (dn *Daemon) syncNode(key string) error {
	startTime := time.Now()
	glog.V(4).Infof("Started syncing node %q (%v)", key, startTime)
	defer func() {
		glog.V(4).Infof("Finished syncing node %q (%v)", key, time.Since(startTime))
	}()

	_, name, err := cache.SplitMetaNamespaceKey(key)
	if err != nil {
		return err
	}
	// If this isn't our node, nothing to do.  The node controller
	// handles other nodes.
	if name != dn.name {
		return nil
	}

	node, err := dn.nodeLister.Get(name)
	if apierrors.IsNotFound(err) {
		glog.V(2).Infof("node %v has been deleted", key)
		return nil
	}
	if err != nil {
		return err
	}
	// Check for Deleted Node
	if node.DeletionTimestamp != nil {
		glog.Infof("Node %s was deleted!", node.Name)
		return nil
	}

	// Deep-copy otherwise we are mutating our cache.
	node = node.DeepCopy()
	if dn.node == nil {
		dn.node = node
		if err := dn.initializeNode(); err != nil {
			return err
		}
	} else {
		dn.node = node
	}

	// Take care of the very first sync of the MCD on a node.
	// This loads the node annotation from the bootstrap (if we're really bootstrapping)
	// and then proceeds to check the state of the node, which includes
	// finalizing an update and/or reconciling the current and desired machine configs.
	if dn.booting {
		// Be sure only the MCD is running now, disable -firstboot.service
		if err := upgradeHackFor44AndBelow(); err != nil {
			return err
		}
		if err := removeIgnitionArtifacts(); err != nil {
			return err
		}
		if err := dn.checkStateOnFirstRun(); err != nil {
			return err
		}
		// finished syncing node for the first time;
		// currently we return immediately here, although
		// I think we should change this to continue.
		dn.booting = false

		// Start the Config Drift Monitor since we're booted up.
		dn.startConfigDriftMonitor()

		return nil
	}

	// Pass to the shared update prep method
	current, desired, err := dn.prepUpdateFromCluster()
	if err != nil {
		return errors.Wrapf(err, "prepping update")
	}
	if current != nil || desired != nil {
		// Only check for config drift if we need to update.
		if err := dn.runPreflightConfigDriftCheck(); err != nil {
			return err
		}

		if err := dn.triggerUpdateWithMachineConfig(current, desired); err != nil {
			return err
		}
	}
	glog.V(2).Infof("Node %s is already synced", node.Name)
	return nil
}

// Validates that the on-disk state matches the currently applied machineconfig
// before an update occurs.
func (dn *Daemon) runPreflightConfigDriftCheck() error {
	// This allows skip behavior based upon the presence of
	// the forcefile: /run/machine-config-daemon-force.
	if forceFileExists() {
		glog.Infof("Skipping preflight config drift check; %s present", constants.MachineConfigDaemonForceFile)
		return nil
	}

	currentOnDisk, err := dn.getCurrentConfigOnDisk()
	if err != nil {
		return fmt.Errorf("could not get on-disk config: %w", err)
	}

	start := time.Now()

	if err := dn.validateOnDiskState(currentOnDisk); err != nil {
		dn.recorder.Eventf(getNodeRef(dn.node), corev1.EventTypeWarning, "PreflightConfigDriftCheckFailed", err.Error())
		glog.Errorf("Preflight config drift check failed: %v", err)
		return configDriftErr(err)
	}

	glog.Infof("Preflight config drift check successful (took %s)", time.Since(start))

	return nil
}

// enqueueDefault calls a default enqueue function
func (dn *Daemon) enqueueDefault(node *corev1.Node) {
	key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(node)
	if err != nil {
		utilruntime.HandleError(fmt.Errorf("couldn't get key for object %#v: %v", node, err))
		return
	}
	dn.queue.AddRateLimited(key)
}

const (
	logindUnit = "systemd-logind.service"
	// IDs are taken from https://cgit.freedesktop.org/systemd/systemd/plain/src/systemd/sd-messages.h
	sdMessageSessionStart = "8d45620c1a4348dbb17410da57c60c66"
)

// detectEarlySSHAccessesFromBoot annotates the node if we find a login before the daemon started up.
func (dn *Daemon) detectEarlySSHAccessesFromBoot() error {
	journalOutput, err := exec.Command("journalctl", "-b", "-o", "cat", "-u", logindUnit, "MESSAGE_ID="+sdMessageSessionStart).CombinedOutput()
	if err != nil {
		return err
	}
	if len(journalOutput) > 0 {
		glog.Info("Detected a login session before the daemon took over on first boot")
		glog.Infof("Applying annotation: %v", machineConfigDaemonSSHAccessAnnotationKey)
		if err := dn.applySSHAccessedAnnotation(); err != nil {
			return err
		}
	}
	return nil
}

// RunOnceFrom is the primary entrypoint for the non-cluster case
func (dn *Daemon) RunOnceFrom(onceFrom string, skipReboot bool) error {
	dn.skipReboot = skipReboot
	configi, contentFrom, err := dn.senseAndLoadOnceFrom(onceFrom)
	if err != nil {
		glog.Warningf("Unable to decipher onceFrom config type: %s", err)
		return err
	}
	switch c := configi.(type) {
	case ign3types.Config:
		glog.V(2).Info("Daemon running directly from Ignition")
		return dn.runOnceFromIgnition(c)
	case mcfgv1.MachineConfig:
		glog.V(2).Info("Daemon running directly from MachineConfig")
		return dn.runOnceFromMachineConfig(c, contentFrom)
	}
	return errors.New("unsupported onceFrom type provided")
}

// RunFirstbootCompleteMachineconfig is run via systemd on the first boot
// to complete processing of the target MachineConfig.
func (dn *Daemon) RunFirstbootCompleteMachineconfig() error {
	data, err := ioutil.ReadFile(constants.MachineConfigEncapsulatedPath)
	if err != nil {
		return err
	}
	var mc mcfgv1.MachineConfig
	err = json.Unmarshal(data, &mc)
	if err != nil {
		return errors.Wrapf(err, "failed to parse MachineConfig")
	}

	// Start with an empty config, then add our *booted* osImageURL to
	// it, reflecting the current machine state.
	oldConfig := canonicalizeEmptyMC(nil)
	oldConfig.Spec.OSImageURL = dn.bootedOSImageURL
	// Currently, we generally expect the bootimage to be older, but in the special
	// case of having bootimage == machine-os-content, and no kernel arguments
	// specified, then we don't need to do anything here.
	mcDiffNotEmpty, err := dn.compareMachineConfig(oldConfig, &mc)
	if err != nil {
		return errors.Wrapf(err, "failed to compare MachineConfig")
	}
	if !mcDiffNotEmpty {
		// Removing this file signals completion of the initial MC processing.
		if err := os.Remove(constants.MachineConfigEncapsulatedPath); err != nil {
			return errors.Wrapf(err, "failed to remove %s", constants.MachineConfigEncapsulatedPath)
		}
		return nil
	}

	dn.skipReboot = true
	err = dn.update(nil, &mc)
	if err != nil {
		return err
	}

	// Removing this file signals completion of the initial MC processing.
	if err := os.Rename(constants.MachineConfigEncapsulatedPath, constants.MachineConfigEncapsulatedBakPath); err != nil {
		return errors.Wrap(err, "failed to rename encapsulated MachineConfig after processing on firstboot")
	}

	dn.skipReboot = false
	return dn.reboot(fmt.Sprintf("Completing firstboot provisioning to %s", mc.GetName()))
}

// InstallSignalHandler installs the handler for the signals the daemon should act on
func (dn *Daemon) InstallSignalHandler(signaled chan struct{}) {
	termChan := make(chan os.Signal, 2048)
	signal.Notify(termChan, syscall.SIGTERM)

	// Catch SIGTERM - if we're actively updating, we should avoid
	// having the process be killed.
	// https://github.com/openshift/machine-config-operator/issues/407
	go func() {
		for sig := range termChan {
			//nolint:gocritic
			switch sig {
			case syscall.SIGTERM:
				dn.updateActiveLock.Lock()
				updateActive := dn.updateActive
				dn.updateActiveLock.Unlock()
				if updateActive {
					glog.Info("Got SIGTERM, but actively updating")
				} else {
					close(signaled)
					return
				}
			}
		}
	}()
}

// Run finishes informer setup and then blocks, and the informer will be
// responsible for triggering callbacks to handle updates. Successful
// updates shouldn't return, and should just reboot the node.
func (dn *Daemon) Run(stopCh <-chan struct{}, exitCh <-chan error) error {
	dn.logSystem("Starting to manage node: %s", dn.name)
	dn.LogSystemData()

	glog.Info("Starting MachineConfigDaemon")
	defer glog.Info("Shutting down MachineConfigDaemon")

	signaled := make(chan struct{})
	dn.InstallSignalHandler(signaled)

	if dn.kubeletHealthzEnabled {
		glog.Info("Enabling Kubelet Healthz Monitor")
		go dn.runKubeletHealthzMonitor(stopCh, dn.exitCh)
	}

	defer utilruntime.HandleCrash()
	defer dn.queue.ShutDown()

	if !cache.WaitForCacheSync(stopCh, dn.nodeListerSynced, dn.mcListerSynced) {
		return errors.New("failed to sync initial listers cache")
	}

	go wait.Until(dn.worker, time.Second, stopCh)

	for {
		select {
		case <-stopCh:
			return nil
		case <-signaled:
			return nil
		case err := <-exitCh:
			// This channel gets errors from auxiliary goroutines like loginmonitor and kubehealth
			glog.Warningf("Got an error from auxiliary tools: %v", err)
		}
	}
}

func (dn *Daemon) runLoginMonitor(stopCh <-chan struct{}, exitCh chan<- error) {
	cmd := exec.Command("journalctl", "-b", "-f", "-o", "cat", "-u", logindUnit, "MESSAGE_ID="+sdMessageSessionStart)
	stdout, err := cmd.StdoutPipe()
	if err != nil {
		exitCh <- err
		return
	}
	if err := cmd.Start(); err != nil {
		exitCh <- err
		return
	}
	worker := make(chan struct{})
	go func() {
		for {
			select {
			case <-worker:
				return
			default:
				buf := make([]byte, 1024)
				l, err := stdout.Read(buf)
				if err != nil {
					if err == io.EOF {
						return
					}
					exitCh <- err
					return
				}
				if l > 0 {
					line := strings.Split(string(buf), "\n")[0]
					glog.Infof("Detected a new login session: %s", line)
					glog.Infof("Login access is discouraged! Applying annotation: %v", machineConfigDaemonSSHAccessAnnotationKey)
					if err := dn.applySSHAccessedAnnotation(); err != nil {
						exitCh <- err
					}
				}
			}
		}
	}()
	<-stopCh
	close(worker)
	cmd.Process.Kill()
}

func (dn *Daemon) applySSHAccessedAnnotation() error {
	if err := dn.nodeWriter.SetSSHAccessed(dn.kubeClient.CoreV1().Nodes(), dn.nodeLister, dn.name); err != nil {
		return fmt.Errorf("error: cannot apply annotation for SSH access due to: %v", err)
	}
	return nil
}

// Called whenever the on-disk config has drifted from the current machineconfig.
func (dn *Daemon) onConfigDrift(err error) {
	dn.recorder.Eventf(getNodeRef(dn.node), corev1.EventTypeWarning, "ConfigDriftDetected", err.Error())
	glog.Error(err)
	dn.updateErrorState(err)
}

func (dn *Daemon) startConfigDriftMonitor() {
	// Even though the Config Drift Monitor object ensures that only a single
	// Config Drift Watcher is running at any given time, other things, such as
	// emitting Kube events on startup, should only occur if we weren't
	// previously running. This provides us with a way to short-circuit that path
	// if we already have a Config Drift Watcher running.
	if dn.configDriftMonitor.IsRunning() {
		return
	}

	currentConfig, err := dn.getCurrentConfigOnDisk()
	if err != nil && dn.os.IsCoreOSVariant() {
		dn.exitCh <- fmt.Errorf("could not get current config from disk: %w", err)
		return
	}

	opts := ConfigDriftMonitorOpts{
		OnDrift:       dn.onConfigDrift,
		SystemdPath:   pathSystemd,
		ErrChan:       dn.exitCh,
		MachineConfig: currentConfig,
	}

	if err := dn.configDriftMonitor.Start(opts); err != nil {
		dn.exitCh <- fmt.Errorf("could not start Config Drift Monitor: %w", err)
		return
	}

	dn.recorder.Eventf(getNodeRef(dn.node), corev1.EventTypeNormal, "ConfigDriftMonitorStarted",
		"Config Drift Monitor started, watching against %s", currentConfig.Name)

	go func() {
		// Common shutdown function
		shutdown := func() {
			// Stop the Config Drift Monitor, if it's not already stopped.
			dn.configDriftMonitor.Stop()
			// Report that we've shut down
			dn.recorder.Eventf(getNodeRef(dn.node), corev1.EventTypeNormal, "ConfigDriftMonitorStopped", "Config Drift Monitor stopped")
		}

		for {
			select {
			case <-dn.stopCh:
				// We got a stop signal from outside the MCD.
				shutdown()
				return
			case <-dn.configDriftMonitor.Done():
				// We got a stop signal from the Config Drift Monitor.
				shutdown()
				return
			}
		}
	}()
}

func (dn *Daemon) stopConfigDriftMonitor() {
	dn.configDriftMonitor.Stop()
}

func (dn *Daemon) runKubeletHealthzMonitor(stopCh <-chan struct{}, exitCh chan<- error) {
	failureCount := 0
	KubeletHealthState.Set(float64(failureCount))
	for {
		select {
		case <-stopCh:
			return
		case <-time.After(kubeletHealthzPollingInterval):
			err := dn.getHealth()
			if err != nil {
				failureCount++
				exitCh <- fmt.Errorf("kubelet health check has failed %d times: %v", failureCount, err)
			} else {
				// reset failure count on success
				failureCount = 0
			}
			KubeletHealthState.Set(float64(failureCount))
		}
	}
}

func (dn *Daemon) getHealth() error {
	glog.V(2).Info("Kubelet health running")
	ctx, cancel := context.WithTimeout(context.Background(), kubeletHealthzTimeout)
	defer cancel()

	req, err := http.NewRequest("GET", dn.kubeletHealthzEndpoint, nil)
	if err != nil {
		return err
	}
	req = req.WithContext(ctx)

	client := http.Client{}
	resp, err := client.Do(req)
	if err != nil {
		return err
	}
	defer resp.Body.Close()

	respData, err := ioutil.ReadAll(resp.Body)
	if err != nil {
		return err
	}

	if string(respData) != "ok" {
		glog.Warningf("Kubelet Healthz Endpoint returned: %s", string(respData))
		return nil
	}

	glog.V(2).Info("Kubelet health ok")

	return nil
}

// stateAndConfigs is the "state" node annotation plus parsed machine configs
// referenced by the currentConfig and desiredConfig annotations.  If we have
// a "pending" config (we're coming up after a reboot attempting to apply a config),
// we'll load that as well - otherwise it will be nil.
//
// If any of the object names are the same, they will be pointer-equal.
type stateAndConfigs struct {
	bootstrapping bool
	state         string
	currentConfig *mcfgv1.MachineConfig
	pendingConfig *mcfgv1.MachineConfig
	desiredConfig *mcfgv1.MachineConfig
}

func (dn *Daemon) getStateAndConfigs(pendingConfigName string) (*stateAndConfigs, error) {
	_, err := os.Lstat(constants.InitialNodeAnnotationsFilePath)
	var bootstrapping bool
	if err != nil {
		if !os.IsNotExist(err) {
			return nil, err
		}
		// The node annotation file (laid down by the MCS)
		// doesn't exist, we must not be bootstrapping
	} else {
		bootstrapping = true
		glog.Info("In bootstrap mode")
	}

	currentConfigName, err := getNodeAnnotation(dn.node, constants.CurrentMachineConfigAnnotationKey)
	if err != nil {
		return nil, err
	}
	desiredConfigName, err := getNodeAnnotation(dn.node, constants.DesiredMachineConfigAnnotationKey)
	if err != nil {
		return nil, err
	}
	currentConfig, err := dn.mcLister.Get(currentConfigName)
	if err != nil {
		return nil, err
	}
	state, err := getNodeAnnotationExt(dn.node, constants.MachineConfigDaemonStateAnnotationKey, true)
	if err != nil {
		return nil, err
	}
	// Temporary hack: the MCS used to not write the state=done annotation
	// key.  If it's unset, let's write it now.
	if state == "" {
		state = constants.MachineConfigDaemonStateDone
	}

	var desiredConfig *mcfgv1.MachineConfig
	if currentConfigName == desiredConfigName {
		desiredConfig = currentConfig
		glog.Infof("Current+desired config: %s", currentConfigName)
	} else {
		desiredConfig, err = dn.mcLister.Get(desiredConfigName)
		if err != nil {
			return nil, err
		}

		glog.Infof("Current config: %s", currentConfigName)
		glog.Infof("Desired config: %s", desiredConfigName)
	}

	var pendingConfig *mcfgv1.MachineConfig
	// We usually expect that if current != desired, pending == desired; however,
	// it can happen that desiredConfig changed while we were rebooting.
	if pendingConfigName == desiredConfigName {
		pendingConfig = desiredConfig
	} else if pendingConfigName != "" {
		pendingConfig, err = dn.mcLister.Get(pendingConfigName)
		if err != nil {
			return nil, err
		}

		glog.Infof("Pending config: %s", pendingConfigName)
	}

	var degradedReason string
	if state == constants.MachineConfigDaemonStateDegraded {
		degradedReason, err = getNodeAnnotation(dn.node, constants.MachineConfigDaemonReasonAnnotationKey)
		if err != nil {
			glog.Errorf("Could not retrieve degraded reason. err: %v", err)
		}
	}

	MCDState.WithLabelValues(state, degradedReason).SetToCurrentTime()

	return &stateAndConfigs{
		bootstrapping: bootstrapping,
		currentConfig: currentConfig,
		pendingConfig: pendingConfig,
		desiredConfig: desiredConfig,
		state:         state,
	}, nil
}

// LogSystemData gathers data from the OS and adds it to our stdout; should only
// be called once on MCD startup to log things which generally shouldn't change
// dynamically after a reboot.
func (dn *Daemon) LogSystemData() {
	// Print status if available
	if dn.os.IsCoreOSVariant() {
		status, err := dn.NodeUpdaterClient.GetStatus()
		if err != nil {
			glog.Fatalf("unable to get rpm-ostree status: %s", err)
		}
		glog.Info(status)
	}

	boots, err := runGetOut("journalctl", "--list-boots")
	if err != nil {
		glog.Errorf("Listing boots: %v", err)
	}
	glog.Infof("journalctl --list-boots:\n" + string(boots))

	// Since nothing in the cluster today watches systemd units, let's
	// at least capture them in our logs to start.  See also
	// https://github.com/openshift/machine-config-operator/issues/1365
	// This only captures units that started *before* the MCD, we need
	// to also watch dynamically of course.
	//
	// also xref https://github.com/coreos/console-login-helper-messages/blob/e8a849f4c23910e7c556c10719911cc59873fc23/usr/share/console-login-helper-messages/profile.sh
	failedServices, err := runGetOut("systemctl", "list-units", "--state=failed", "--no-legend")
	if err != nil {
		glog.Errorf("Listing failed systemd services: %v", err)
	} else if len(failedServices) > 0 {
		glog.Infof("systemctl --failed:\n" + string(failedServices))
	} else {
		glog.Info("systemd service state: OK")
	}
}

const (
	pendingConfigPath = "/etc/machine-config-daemon/state.json"
)

type pendingConfigState struct {
	PendingConfig string `json:"pendingConfig,omitempty"`
	BootID        string `json:"bootID,omitempty"`
}

// XXX: drop this
func (dn *Daemon) writePendingConfig(desiredConfig *mcfgv1.MachineConfig) error {
	t := &pendingConfigState{
		PendingConfig: desiredConfig.GetName(),
		BootID:        dn.bootID,
	}
	b, err := json.Marshal(t)
	if err != nil {
		return err
	}
	return writeFileAtomicallyWithDefaults(pendingConfigPath, b)
}

// XXX: drop this
// we need this compatibility layer for now
func (dn *Daemon) getPendingConfig() (*pendingConfigState, error) {
	s, err := ioutil.ReadFile("/etc/machine-config-daemon/state.json")
	if err != nil {
		if !os.IsNotExist(err) {
			return nil, errors.Wrapf(err, "loading transient state")
		}
		return nil, nil
	}
	var p pendingConfigState
	if err := json.Unmarshal(s, &p); err != nil {
		return nil, errors.Wrapf(err, "parsing transient state")
	}

	return &p, nil
}

func (dn *Daemon) getCurrentConfigOnDisk() (*mcfgv1.MachineConfig, error) {
	mcJSON, err := os.Open(dn.currentConfigPath)
	if err != nil {
		return nil, err
	}
	defer mcJSON.Close()
	currentOnDisk := &mcfgv1.MachineConfig{}
	if err := json.NewDecoder(bufio.NewReader(mcJSON)).Decode(currentOnDisk); err != nil {
		return nil, err
	}
	return currentOnDisk, nil
}

func (dn *Daemon) storeCurrentConfigOnDisk(current *mcfgv1.MachineConfig) error {
	mcJSON, err := json.Marshal(current)
	if err != nil {
		return err
	}
	return writeFileAtomicallyWithDefaults(dn.currentConfigPath, mcJSON)
}

// https://bugzilla.redhat.com/show_bug.cgi?id=1842906
// If we didn't successfully complete -firstboot.service, because
// 4.5 and newer removed the BindsTo=, the service may start downgrading
// things.  At this point we should have already applied all target
// changes, so just rename the file to .bak the same as the -firstboot
// path does.
func upgradeHackFor44AndBelow() error {
	_, err := os.Stat(constants.MachineConfigEncapsulatedPath)
	if err == nil {
		glog.Warningf("Failed to complete machine-config-daemon-firstboot before joining cluster!")
		// Removing this file signals completion of the initial MC processing.
		if err := os.Rename(constants.MachineConfigEncapsulatedPath, constants.MachineConfigEncapsulatedBakPath); err != nil {
			return errors.Wrap(err, "failed to rename encapsulated MachineConfig after processing on firstboot")
		}
	}
	return nil
}

// Remove artifacts used by ignition, that the MCO should no longer
// use since the machine is up.
// Currently removes the systemd preset file written by Ignition.
func removeIgnitionArtifacts() error {
	if err := os.Remove(constants.IgnitionSystemdPresetFile); err != nil && !os.IsNotExist(err) {
		return errors.Wrap(err, "failed to remove Ignition-written systemd preset file")
	}
	return nil
}

// checkStateOnFirstRun is a core entrypoint for our state machine.
// It determines whether we're in our desired state, or if we're
// transitioning between states, and whether or not we need to update
// to a new state. It also checks if someone jumped on a node before
// the daemon took over.
//
// Some more background in this PR: https://github.com/openshift/machine-config-operator/pull/245
//nolint:gocyclo
func (dn *Daemon) checkStateOnFirstRun() error {
	node, err := dn.loadNodeAnnotations(dn.node)
	if err != nil {
		return err
	}
	// Update our cached copy
	dn.node = node

	pendingState, err := dn.getPendingState()
	if err != nil {
		return err
	}
	var pendingConfigName, bootID string
	if pendingState != nil {
		pendingConfigName = pendingState.Message
		bootID = pendingState.BootID
	}
	// XXX: drop this
	// we need this compatibility layer for now
	if pendingState == nil {
		legacyPendingState, err := dn.getPendingConfig()
		if err != nil {
			return err
		}
		if legacyPendingState != nil {
			pendingConfigName = legacyPendingState.PendingConfig
			bootID = legacyPendingState.BootID
		}
	}

	state, err := dn.getStateAndConfigs(pendingConfigName)
	if err != nil {
		return err
	}

	// if we have a pendingConfig but we're into the same bootid, we failed to drain or reboot
	// and if we still have a pendingConfig it means we've been killed by kube after 600s
	// take a stab at that and re-run the drain+reboot routine
	if state.pendingConfig != nil && bootID == dn.bootID {
		dn.logSystem("drain interrupted, retrying")
		if err := dn.performDrain(); err != nil {
			return err
		}
		if err := dn.finalizeBeforeReboot(state.pendingConfig); err != nil {
			return err
		}
		return dn.reboot(fmt.Sprintf("Node will reboot into config %v", state.pendingConfig.GetName()))
	}

	if err := dn.detectEarlySSHAccessesFromBoot(); err != nil {
		return fmt.Errorf("error detecting previous SSH accesses: %v", err)
	}

	// Bootstrapping state is when we have the node annotations file
	if state.bootstrapping {
		targetOSImageURL := state.currentConfig.Spec.OSImageURL
		osMatch := dn.checkOS(targetOSImageURL)
		if !osMatch {
			glog.Infof("Bootstrap pivot required to: %s", targetOSImageURL)
			// This only returns on error
			osImageContentDir, err := ExtractOSImage(targetOSImageURL)
			if err != nil {
				return err
			}
			if err := dn.updateOS(state.currentConfig, osImageContentDir); err != nil {
				return err
			}
			if err := os.RemoveAll(osImageContentDir); err != nil {
				return err
			}
			if err := dn.finalizeBeforeReboot(state.currentConfig); err != nil {
				return err
			}
			return dn.reboot(fmt.Sprintf("Node will reboot into config %v", state.currentConfig.GetName()))
		}
		glog.Info("No bootstrap pivot required; unlinking bootstrap node annotations")

		// Rename the bootstrap node annotations; the
		// currentConfig's osImageURL should now be *truth*.
		// In other words if it drifts somehow, we go degraded.
		if err := os.Rename(constants.InitialNodeAnnotationsFilePath, constants.InitialNodeAnnotationsBakPath); err != nil {
			return errors.Wrap(err, "renaming initial node annotation file")
		}
	}

	var currentOnDisk *mcfgv1.MachineConfig
	if !state.bootstrapping {
		var err error
		currentOnDisk, err = dn.getCurrentConfigOnDisk()
		// we allow ENOENT for previous MCO versions that don't write this...
		if err != nil && !os.IsNotExist(err) {
			return err
		}
	}

	if currentOnDisk != nil && state.currentConfig.GetName() != currentOnDisk.GetName() {
		// The on disk state (if available) is always considered truth.
		// We want to handle the case where etcd state was restored from a backup.
		dn.logSystem("Disk currentConfig %s overrides node's currentConfig annotation %s", currentOnDisk.GetName(), state.currentConfig.GetName())
		state.currentConfig = currentOnDisk
	}

	// Validate the on-disk state against what we *expect*.
	//
	// In the case where we're booting a node for the first time, or the MCD
	// is restarted, that will be the current config.
	//
	// In the case where we have
	// a pending config, this is where we validate that it actually applied.
	// We currently just do this on startup, but in the future it could e.g. be
	// a once-a-day or week cron job.
	var expectedConfig *mcfgv1.MachineConfig
	if state.pendingConfig != nil {
		glog.Infof("Validating against pending config %s", state.pendingConfig.GetName())
		expectedConfig = state.pendingConfig
	} else {
		glog.Infof("Validating against current config %s", state.currentConfig.GetName())
		expectedConfig = state.currentConfig
	}

	if forceFileExists() {
		glog.Infof("Skipping on-disk validation; %s present", constants.MachineConfigDaemonForceFile)
		return dn.triggerUpdateWithMachineConfig(state.currentConfig, state.desiredConfig)
	}

	if err := dn.validateOnDiskState(expectedConfig); err != nil {
		wErr := fmt.Errorf("unexpected on-disk state validating against %s: %w", expectedConfig.GetName(), err)
		dn.recorder.Eventf(getNodeRef(dn.node), corev1.EventTypeWarning, "OnDiskStateValidationFailed", wErr.Error())
		return wErr
	}

	glog.Info("Validated on-disk state")

	// We've validated state. Now, ensure that node is in desired state
	var inDesiredConfig bool
	if inDesiredConfig, err = dn.updateConfigAndState(state); err != nil {
		return err
	}
	if inDesiredConfig {
		return nil
	}

	if dn.recorder != nil {
		dn.recorder.Eventf(getNodeRef(dn.node), corev1.EventTypeNormal, "BootResync", fmt.Sprintf("Booting node %s, currentConfig %s, desiredConfig %s", dn.node.Name, state.currentConfig.GetName(), state.desiredConfig.GetName()))
	}
	// currentConfig != desiredConfig, and we're not booting up into the desiredConfig.
	// Kick off an update.
	return dn.triggerUpdateWithMachineConfig(state.currentConfig, state.desiredConfig)
}

// updateConfigAndState updates node to desired state, labels nodes as done and uncordon
func (dn *Daemon) updateConfigAndState(state *stateAndConfigs) (bool, error) {
	// In the case where we had a pendingConfig, make that now currentConfig.
	// We update the node annotation, delete the state file, etc.
	if state.pendingConfig != nil {
		if dn.recorder != nil {
			dn.recorder.Eventf(getNodeRef(dn.node), corev1.EventTypeNormal, "NodeDone", fmt.Sprintf("Setting node %s, currentConfig %s to Done", dn.node.Name, state.pendingConfig.GetName()))
		}
		if err := dn.nodeWriter.SetDone(dn.kubeClient.CoreV1().Nodes(), dn.nodeLister, dn.name, state.pendingConfig.GetName()); err != nil {
			return true, errors.Wrap(err, "error setting node's state to Done")
		}
		if out, err := dn.storePendingState(state.pendingConfig, 0); err != nil {
			return true, errors.Wrapf(err, "failed to reset pending config: %s", string(out))
		}

		state.currentConfig = state.pendingConfig
	}

	if state.bootstrapping {
		if err := dn.storeCurrentConfigOnDisk(state.currentConfig); err != nil {
			return false, err
		}
	}

	// In case of node reboot, it may be the case that desiredConfig changed while we
	// were coming up, so we next look at that before uncordoning the node (so
	// we don't uncordon and then immediately re-cordon)
	inDesiredConfig := state.currentConfig.GetName() == state.desiredConfig.GetName()
	if inDesiredConfig {
		if state.pendingConfig != nil {
			// Great, we've successfully rebooted for the desired config,
			// let's mark it done!
			glog.Infof("Completing pending config %s", state.pendingConfig.GetName())
			if err := dn.completeUpdate(state.pendingConfig.GetName()); err != nil {
				MCDUpdateState.WithLabelValues("", err.Error()).SetToCurrentTime()
				return inDesiredConfig, err
			}
		}
		// If we're degraded here, it means we got an error likely on startup and we retried.
		// If that's the case, clear it out.
		if state.state == constants.MachineConfigDaemonStateDegraded {
			if err := dn.nodeWriter.SetDone(dn.kubeClient.CoreV1().Nodes(), dn.nodeLister, dn.name, state.currentConfig.GetName()); err != nil {
				errLabelStr := fmt.Sprintf("error setting node's state to Done: %v", err)
				MCDUpdateState.WithLabelValues("", errLabelStr).SetToCurrentTime()
				return inDesiredConfig, errors.Wrap(err, "error setting node's state to Done")
			}
		}

		glog.Infof("In desired config %s", state.currentConfig.GetName())
		MCDUpdateState.WithLabelValues(state.currentConfig.GetName(), "").SetToCurrentTime()
	}

	// No errors have occurred. Returns true if currentConfig == desiredConfig, false otherwise (needs update)
	return inDesiredConfig, nil
}

// runOnceFromMachineConfig utilizes a parsed machineConfig and executes in onceFrom
// mode. If the content was remote, it executes cluster calls, otherwise it assumes
// no cluster is present yet.
func (dn *Daemon) runOnceFromMachineConfig(machineConfig mcfgv1.MachineConfig, contentFrom onceFromOrigin) error {
	if contentFrom == onceFromRemoteConfig {
		if dn.kubeClient == nil {
			panic("running in onceFrom mode with a remote MachineConfig without a cluster")
		}
		// NOTE: This case expects a cluster to exists already.
		current, desired, err := dn.prepUpdateFromCluster()
		if err != nil {
			dn.nodeWriter.SetDegraded(err, dn.kubeClient.CoreV1().Nodes(), dn.nodeLister, dn.name)
			return err
		}
		if current == nil || desired == nil {
			return nil
		}
		// At this point we have verified we need to update
		if err := dn.triggerUpdateWithMachineConfig(current, &machineConfig); err != nil {
			dn.nodeWriter.SetDegraded(err, dn.kubeClient.CoreV1().Nodes(), dn.nodeLister, dn.name)
			return err
		}
		return nil
	}
	if contentFrom == onceFromLocalConfig {
		// Execute update without hitting the cluster
		return dn.update(nil, &machineConfig)
	}
	// Otherwise return an error as the input format is unsupported
	return fmt.Errorf("%v is not a path nor url; can not run once", contentFrom)
}

// runOnceFromIgnition executes MCD's subset of Ignition functionality in onceFrom mode
func (dn *Daemon) runOnceFromIgnition(ignConfig ign3types.Config) error {
	// Execute update without hitting the cluster
	if err := dn.writeFiles(ignConfig.Storage.Files); err != nil {
		return err
	}
	if err := dn.writeUnits(ignConfig.Systemd.Units); err != nil {
		return err
	}
	// Unconditionally remove this file in the once-from (classic RHEL)
	// case.  We use this file to suppress things like kubelet and SDN
	// starting on CoreOS during the firstboot/pivot boot, but there's
	// no such thing on classic RHEL.
	_, err := os.Stat(constants.MachineConfigEncapsulatedPath)
	if err == nil {
		if err := os.Remove(constants.MachineConfigEncapsulatedPath); err != nil {
			return errors.Wrapf(err, "failed to remove %s", constants.MachineConfigEncapsulatedPath)
		}
	}
	return dn.reboot("runOnceFromIgnition complete")
}

func (dn *Daemon) handleNodeEvent(node interface{}) {
	n := node.(*corev1.Node)

	glog.V(4).Infof("Updating Node %s", n.Name)
	dn.enqueueNode(n)
}

// prepUpdateFromCluster handles the shared update prepping functionality for
// flows that expect the cluster to already be available. Returns true if an
// update is required, false otherwise.
func (dn *Daemon) prepUpdateFromCluster() (*mcfgv1.MachineConfig, *mcfgv1.MachineConfig, error) {
	desiredConfigName, err := getNodeAnnotationExt(dn.node, constants.DesiredMachineConfigAnnotationKey, true)
	if err != nil {
		return nil, nil, err
	}
	desiredConfig, err := dn.mcLister.Get(desiredConfigName)
	if err != nil {
		return nil, nil, err
	}
	// currentConfig is always expected to be there as loadNodeAnnotations
	// is one of the very first calls when the daemon starts.
	currentConfigName, err := getNodeAnnotation(dn.node, constants.CurrentMachineConfigAnnotationKey)
	if err != nil {
		return nil, nil, err
	}
	currentConfig, err := dn.mcLister.Get(currentConfigName)
	if err != nil {
		return nil, nil, err
	}
	state, err := getNodeAnnotation(dn.node, constants.MachineConfigDaemonStateAnnotationKey)
	if err != nil {
		return nil, nil, err
	}

	currentOnDisk, err := dn.getCurrentConfigOnDisk()
	if err != nil && !os.IsNotExist(err) {
		return nil, nil, err
	}

	if currentOnDisk != nil && currentOnDisk.GetName() != currentConfig.GetName() {
		return currentOnDisk, desiredConfig, nil
	}

	// Detect if there is an update
	if desiredConfigName == currentConfigName && state == constants.MachineConfigDaemonStateDone {
		// No actual update to the config
		glog.V(2).Info("No updating is required")
		return nil, nil, nil
	}
	return currentConfig, desiredConfig, nil
}

// completeUpdate marks the node as schedulable again, then deletes the
// "transient state" file, which signifies that all of those prior steps have
// been completed.
func (dn *Daemon) completeUpdate(desiredConfigName string) error {
	if err := dn.cordonOrUncordonNode(false); err != nil {
		return err
	}

	ctx := context.TODO()
	if err := dn.removeUpdateInProgressTaint(ctx); err != nil {
		return err
	}

	dn.logSystem("Update completed for config %s and node has been successfully uncordoned", desiredConfigName)
	dn.recorder.Eventf(getNodeRef(dn.node), corev1.EventTypeNormal, "Uncordon", fmt.Sprintf("Update completed for config %s and node has been uncordoned", desiredConfigName))

	return nil
}

func (dn *Daemon) removeUpdateInProgressTaint(ctx context.Context) error {
	return clientretry.RetryOnConflict(commonconstants.NodeUpdateBackoff, func() error {
		oldNode, err := dn.kubeClient.CoreV1().Nodes().Get(ctx, dn.name, metav1.GetOptions{})
		if err != nil {
			return err
		}
		oldData, err := json.Marshal(oldNode)
		if err != nil {
			return err
		}

		newNode := oldNode.DeepCopy()

		// New taints to be copied.
		var taintsAfterUpgrade []corev1.Taint
		for _, taint := range newNode.Spec.Taints {
			if taint.MatchTaint(commonconstants.NodeUpdateInProgressTaint) {
				continue
			} else {
				taintsAfterUpgrade = append(taintsAfterUpgrade, taint)
			}
		}
		// updateInProgress taint is not there, so no need to patch the node object, return immediately
		if len(taintsAfterUpgrade) == len(newNode.Spec.Taints) {
			return nil
		}
		// Remove the NodeUpdateInProgressTaint.
		newNode.Spec.Taints = taintsAfterUpgrade
		newData, err := json.Marshal(newNode)
		if err != nil {
			return err
		}

		patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, corev1.Node{})
		if err != nil {
			return fmt.Errorf("failed to create patch for node %q: %v", dn.name, err)
		}
		_, err = dn.kubeClient.CoreV1().Nodes().Patch(ctx, dn.name, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{})
		return err
	})
}

// triggerUpdateWithMachineConfig starts the update. It queries the cluster for
// the current and desired config if they weren't passed.
func (dn *Daemon) triggerUpdateWithMachineConfig(currentConfig, desiredConfig *mcfgv1.MachineConfig) error {
	if currentConfig == nil {
		ccAnnotation, err := getNodeAnnotation(dn.node, constants.CurrentMachineConfigAnnotationKey)
		if err != nil {
			return err
		}
		currentConfig, err = dn.mcLister.Get(ccAnnotation)
		if err != nil {
			return err
		}
	}

	if desiredConfig == nil {
		dcAnnotation, err := getNodeAnnotation(dn.node, constants.DesiredMachineConfigAnnotationKey)
		if err != nil {
			return err
		}
		desiredConfig, err = dn.mcLister.Get(dcAnnotation)
		if err != nil {
			return err
		}
	}

	// Shut down the Config Drift Monitor since we'll be performing an update
	// and the config will "drift" while the update is occurring.
	dn.stopConfigDriftMonitor()

	// run the update process. this function doesn't currently return.
	return dn.update(currentConfig, desiredConfig)
}

// validateOnDiskState compares the on-disk state against what a configuration
// specifies.  If for example an admin ssh'd into a node, or another operator
// is stomping on our files, we want to highlight that and mark the system
// degraded.
func (dn *Daemon) validateOnDiskState(currentConfig *mcfgv1.MachineConfig) error {
	// Be sure we're booted into the OS we expect
	osMatch := dn.checkOS(currentConfig.Spec.OSImageURL)
	if !osMatch {
		return errors.Errorf("expected target osImageURL %q, have %q", currentConfig.Spec.OSImageURL, dn.bootedOSImageURL)
	}

	return validateOnDiskState(currentConfig, pathSystemd)
}

// compareOSImageURL checks whether the current and desired
// URL are the same.  This used to do more, but now the
// only special casing is to support an empty desired URL
// as meaning "keep current OS" which we probably don't need
// anymore either.
func compareOSImageURL(current, desired string) bool {
	// A desired "" is special cased
	return desired == "" || current == desired
}

// checkOS determines whether the booted system matches the target
// osImageURL and if not whether we need to take action.  This function
// returns `true` if no action is required, which is the case if we're
// not running RHCOS or FCOS, or if the target osImageURL is "" (unspecified),
// or if the digests match.
// Otherwise if `false` is returned, then we need to perform an update.
func (dn *Daemon) checkOS(osImageURL string) bool {
	// Nothing to do if we're not on RHCOS or FCOS
	if !dn.os.IsCoreOSVariant() {
		glog.Infof(`Not booted into a CoreOS variant, ignoring target OSImageURL %s`, osImageURL)
		return true
	}

	return compareOSImageURL(dn.bootedOSImageURL, osImageURL)
}

// Close closes all the connections the node agent has open for it's lifetime
func (dn *Daemon) Close() {
}

// ValidPath attempts to see if the path provided is indeed an acceptable
// filesystem path. This function does not check if the path exists.
func ValidPath(path string) bool {
	for _, validStart := range []string{".", "..", "/"} {
		if strings.HasPrefix(path, validStart) {
			return true
		}
	}
	return false
}

// senseAndLoadOnceFrom gets a hold of the content for supported onceFrom configurations,
// parses to verify the type, and returns back the genericInterface, the type description,
// if it was local or remote, and error.
func (dn *Daemon) senseAndLoadOnceFrom(onceFrom string) (interface{}, onceFromOrigin, error) {
	var (
		content     []byte
		contentFrom onceFromOrigin
	)
	// Read the content from a remote endpoint if requested
	/* #nosec */
	if strings.HasPrefix(onceFrom, "http://") || strings.HasPrefix(onceFrom, "https://") {
		contentFrom = onceFromRemoteConfig
		resp, err := http.Get(onceFrom)
		if err != nil {
			return nil, contentFrom, err
		}
		defer resp.Body.Close()
		// Read the body content from the request
		content, err = ioutil.ReadAll(resp.Body)
		if err != nil {
			return nil, contentFrom, err
		}

	} else {
		// Otherwise read it from a local file
		contentFrom = onceFromLocalConfig
		absoluteOnceFrom, err := filepath.Abs(filepath.Clean(onceFrom))
		if err != nil {
			return nil, contentFrom, err
		}
		content, err = ioutil.ReadFile(absoluteOnceFrom)
		if err != nil {
			return nil, contentFrom, err
		}
	}

	// Try each supported parser
	ignConfig, err := ctrlcommon.ParseAndConvertConfig(content)
	if err == nil && ignConfig.Ignition.Version != "" {
		glog.V(2).Info("onceFrom file is of type Ignition")
		return ignConfig, contentFrom, nil
	}

	glog.V(2).Infof("%s is not an Ignition config: %v\nTrying MachineConfig.", onceFrom, err)

	// Try to parse as a machine config
	mc, err := mcoResourceRead.ReadMachineConfigV1(content)
	if err == nil && mc != nil {
		glog.V(2).Info("onceFrom file is of type MachineConfig")
		return *mc, contentFrom, nil
	}

	return nil, onceFromUnknownConfig, fmt.Errorf("unable to decipher onceFrom config type: %v", err)
}

func isSingleNodeTopology(topology configv1.TopologyMode) bool {
	return topology == configv1.SingleReplicaTopologyMode
}

// getControlPlaneTopology reads from node annotation and returns
// controlPlaneTopology value set in the cluster. If annotation value
// is unrecognized, we consider it as a highly available cluster.
func (dn *Daemon) getControlPlaneTopology() configv1.TopologyMode {
	controlPlaneTopology := dn.node.Annotations[constants.ClusterControlPlaneTopologyAnnotationKey]
	switch configv1.TopologyMode(controlPlaneTopology) {
	case configv1.SingleReplicaTopologyMode:
		return configv1.SingleReplicaTopologyMode
	case configv1.HighlyAvailableTopologyMode:
		return configv1.HighlyAvailableTopologyMode
	default:
		// for any unhandled case, default to HighlyAvailableTopologyMode
		return configv1.HighlyAvailableTopologyMode
	}
}

// forceFileExists determines if /run/machine-config-daemon-force is present.
func forceFileExists() bool {
	_, err := os.Stat(constants.MachineConfigDaemonForceFile)

	// No error means we could stat the file; it exists
	if err == nil {
		return true
	}

	return false
}
